Agenda

Announcements

  • Ch 17 programming notebook not assigned, but highly recommended
  • several steps take some non-trivial configuration that I don’t want to require per se

MDSR Ch 17: Towards big data

Biggest of the big… (for perspective)

What happens when data start getting big (in R)

Memory limits in R

Strategies when data get big

If things are just slow…

Recall the teller simulation:


any_active <- function(df) {
  # return TRUE if someone has not finished
  return(max(df$endtime) == Inf)
}

next_customer <- function(df) {
  # returns the next customer in line
  res <- filter(df, endtime == Inf) %>%
    arrange(arrival)
  return(head(res, 1))
}

update_customer <- function(df, cust_num, end_time) {
  # sets the end time of a specific customer
  return(mutate(df, endtime = ifelse(custnum == cust_num, end_time, endtime)))
}


teller_sim <- function(n = 1/2, m = 3/2, hours = 6) {
  # simulation of bank where there is just one teller
  # n: expected number of customers per minute
  # m: expected length of transaction is m minutes
  # hours: bank open for this many hours
  
  customers <- rpois(hours * 60, lambda = n)
  arrival <- numeric(sum(customers))
  position <- 1
  for (i in 1:length(customers)) {
    numcust <- customers[i]
    if (numcust != 0) {
      arrival[position:(position + numcust - 1)] <- rep(i, numcust)
      position <- position + numcust
    }
  }
  duration <- rexp(length(arrival), rate = 1/m)  # E[X]=m
  df <- data.frame(arrival, duration, custnum = 1:length(duration), 
                   endtime = Inf, stringsAsFactors = FALSE)
  
  endtime <- 0 # set up beginning of simulation
  while (any_active(df)) { # anyone left to serve
    next_one <- next_customer(df)
    now <- ifelse(next_one$arrival >= endtime, next_one$arrival, endtime)
    endtime <- now + next_one$duration
    df <- update_customer(df, next_one$custnum, endtime)
  }
  df <- mutate(df, totaltime = endtime - arrival)
  return(favstats(~ totaltime, data = df))
}

Profiling the teller simulation

Rprof("TellerSimProfile")
teller_sim()
Rprof(NULL)
head(summaryRprof("TellerSimProfile")$by.self, 20)

STILL slow? Try biglm

require(biglm)

n <- 20000
p <- 500
d <- as.data.frame(matrix(rnorm(n * (p + 1)), ncol = (p + 1)))
expl_vars <- paste(paste0("V", 2:(p+1)), collapse = " + ")
my_formula <- as.formula(paste("V1 ~ ", expl_vars))

# profile `lm` vs `biglm`
system.time(lm(my_formula, data = d))
   user  system elapsed 
  5.779   0.251   6.131 
system.time(biglm(my_formula, data = d))
   user  system elapsed 
  3.777   0.182   4.002 

Next step: parallel processing

my_cores <- detectCores()
my_cores
[1] 4
k <- 5

# without parallel processing
system.time(lapply(1:k, teller_sim))
   user  system elapsed 
 12.102   0.258  12.565 
# parallelize with 3 cores
system.time(mclapply(1:k, teller_sim, mc.cores = my_cores - 1))
   user  system elapsed 
 11.942   0.933  12.079 

MapReduce (parallelization that’s not embarassing?)

Hadoop & Spark

Interface with Spark

require(sparklyr)
# spark_install()   # only once per machine

Interface with Spark

# modify master to connect to a remote Spark cluster
sc <- spark_connect(master = "local")
* Using Spark: 2.4.0
class(sc)
[1] "spark_connection"       "spark_shell_connection" "DBIConnection"         
babynames_tbl <- 
  sc %>%
  copy_to(babynames::babynames, "babynames", overwrite = TRUE)

class(babynames_tbl)
[1] "tbl_spark" "tbl_sql"   "tbl_lazy"  "tbl"      

Counting Matthews

babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()
Missing values are always removed in SQL.
Use `SUM(x, na.rm = TRUE)` to silence this warning
This warning is displayed only once per session.

From dplyr to SQL

q <- 
  babynames_tbl %>%
  filter(name == "Matthew") %>%
  group_by(year) %>%
  summarise(N = n(), 
            total_births = sum(n)) %>%
  arrange(desc(total_births)) %>%
  head()

q
show_query(q)
<SQL>
SELECT `year`, count(*) AS `N`, SUM(`n`) AS `total_births`
FROM `babynames`
WHERE (`name` = "Matthew")
GROUP BY `year`
ORDER BY `total_births` DESC
LIMIT 6

Querying the Spark cluster

require(DBI)

dbGetQuery(conn = sc, statement = "
                         SELECT year, sum(1) as N, sum(n) as total_births
                         FROM babynames
                         WHERE name == 'Matthew'
                         GROUP BY year
                         ORDER BY total_births desc
                         LIMIT 6
                         ")

Modeling with Spark

require(macleish)

weather_tbl <- copy_to(sc, whately_2015, overwrite = TRUE)

weather_tbl %>%
  sparklyr::ml_linear_regression(rainfall ~ temperature + pressure + rel_humidity) %>%
  summary()
Deviance Residuals:
      Min        1Q    Median        3Q       Max 
-0.041290 -0.021761 -0.011632 -0.000576 15.968356 

Coefficients:
 (Intercept)  temperature     pressure rel_humidity 
   0.7177542    0.0004089   -0.0007545    0.0004377 

R-Squared: 0.004824
Root Mean Squared Error: 0.1982

Alternatives to SQL (Google BigQuery)

require(bigrquery)

project_id <- "stat-380-class-demo"   # Beckman's google cloud project ID

sql <- "SELECT word, count(distinct corpus) as numPlays, sum(word_count) as N
        FROM [publicdata:samples.shakespeare]
        GROUP BY word
        ORDER BY N desc
        LIMIT 10
        "
query_exec(query = sql, project = project_id)
Waiting for authentication in browser...
Press Esc/Ctrl + C to abort
Authentication complete.

Running job -:  1s:
Running job \:  2s:
Running job |:  2s:
Running job /:  3s:
Running job -:  3s:
Running job \:  3s:
Running job |:  4s:
Running job /:  4s:
Running job -:  5s:
Running job \:  5s:
Running job |:  5s:
Running job /:  5s:
                                                                               
10.0 megabytes processed

Rcpp

require(Rcpp)

# write a simple function in C++

cppFunction('int addemup(int x, int y, int z) {
  int sum = x + y + z;
  return sum;
}')


# R recognizes `addemup` like any other function
addemup
function (x, y, z) 
.Call(<pointer: 0x1176de8f0>, x, y, z)
addemup(2, 4, 6)
[1] 12

Stan

parameters {
  real y[2];
}
model {
  y[1] ~ normal(0, 1);
  y[2] ~ double_exponential(0, 2);
}
Loading required package: sp
library(rstan)
Loading required package: StanHeaders
rstan (Version 2.19.2, GitRev: 2e1f913d3ca3)
For execution on a local, multicore CPU with excess RAM we recommend calling
options(mc.cores = parallel::detectCores()).
To avoid recompilation of unchanged Stan programs, we recommend calling
rstan_options(auto_write = TRUE)

Attaching package: ‘rstan’

The following object is masked from ‘package:tidyr’:

    extract
fit <- sampling(ex1, cores = 3)
starting worker pid=25541 on localhost:11849 at 12:27:28.837
starting worker pid=25555 on localhost:11849 at 12:27:29.119
starting worker pid=25569 on localhost:11849 at 12:27:29.379

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 1).
Chain 1: 
Chain 1: Gradient evaluation took 1.1e-05 seconds
Chain 1: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 1: Adjust your expectations accordingly!
Chain 1: 
Chain 1: 
Chain 1: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 1: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 1: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 1: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 1: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 1: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 1: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 1: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 1: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 1: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 1: Iteration: 1800 / 2000 [ 90%]  (Sampling)

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 2).
Chain 1: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 1: 
Chain 1:  Elapsed Time: 0.032539 seconds (Warm-up)
Chain 1:                0.031726 seconds (Sampling)
Chain 1:                0.064265 seconds (Total)
Chain 1: 
Chain 2: 
Chain 2: Gradient evaluation took 1.1e-05 seconds
Chain 2: 1000 transitions using 10 leapfrog steps per transition would take 0.11 seconds.
Chain 2: Adjust your expectations accordingly!
Chain 2: 
Chain 2: 
Chain 2: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 2: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 2: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 2: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 2: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 2: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 2: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 2: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 
SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 3).
2: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 3: 
Chain 3: Gradient evaluation took 9e-06 seconds
Chain 3: 1000 transitions using 10 leapfrog steps per transition would take 0.09 seconds.
Chain 3: Adjust your expectations accordingly!
Chain 3: 
Chain 3: 
Chain 3: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 2: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 2: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 2: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 2: 
Chain 2:  Elapsed Time: 0.033218 seconds (Warm-up)
Chain 2:                0.029568 seconds (Sampling)
Chain 2:                0.062786 seconds (Total)
Chain 2: 
Chain 3: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 3: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 3: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 3: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 3: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 3: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 3: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 3: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 3: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 3: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 3: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 3: 
Chain 3:  Elapsed Time: 0.033801 seconds (Warm-up)
Chain 3:                0.032489 seconds (Sampling)
Chain 3:                0.06629 seconds (Total)
Chain 3: 

SAMPLING FOR MODEL 'stan-5a7c20ea0b45' NOW (CHAIN 4).
Chain 4: 
Chain 4: Gradient evaluation took 6e-06 seconds
Chain 4: 1000 transitions using 10 leapfrog steps per transition would take 0.06 seconds.
Chain 4: Adjust your expectations accordingly!
Chain 4: 
Chain 4: 
Chain 4: Iteration:    1 / 2000 [  0%]  (Warmup)
Chain 4: Iteration:  200 / 2000 [ 10%]  (Warmup)
Chain 4: Iteration:  400 / 2000 [ 20%]  (Warmup)
Chain 4: Iteration:  600 / 2000 [ 30%]  (Warmup)
Chain 4: Iteration:  800 / 2000 [ 40%]  (Warmup)
Chain 4: Iteration: 1000 / 2000 [ 50%]  (Warmup)
Chain 4: Iteration: 1001 / 2000 [ 50%]  (Sampling)
Chain 4: Iteration: 1200 / 2000 [ 60%]  (Sampling)
Chain 4: Iteration: 1400 / 2000 [ 70%]  (Sampling)
Chain 4: Iteration: 1600 / 2000 [ 80%]  (Sampling)
Chain 4: Iteration: 1800 / 2000 [ 90%]  (Sampling)
Chain 4: Iteration: 2000 / 2000 [100%]  (Sampling)
Chain 4: 
Chain 4:  Elapsed Time: 0.019408 seconds (Warm-up)
Chain 4:                0.016486 seconds (Sampling)
Chain 4:                0.035894 seconds (Total)
Chain 4: 
print(fit)
Inference for Stan model: stan-5a7c20ea0b45.
4 chains, each with iter=2000; warmup=1000; thin=1; 
post-warmup draws per chain=1000, total post-warmup draws=4000.

      mean se_mean   sd  2.5%   25%   50%   75% 97.5% n_eff Rhat
y[1] -0.02    0.02 1.02 -2.01 -0.72 -0.01  0.67  2.01  1897    1
y[2]  0.10    0.06 2.79 -5.72 -1.28  0.06  1.47  6.03  2006    1
lp__ -1.50    0.03 1.22 -4.61 -2.06 -1.22 -0.62 -0.10  1329    1

Samples were drawn using NUTS(diag_e) at Wed Oct  2 12:27:32 2019.
For each parameter, n_eff is a crude measure of effective sample size,
and Rhat is the potential scale reduction factor on split chains (at 
convergence, Rhat=1).
# posterior
stan_plot(fit, point_est = "mean", show_density = TRUE, fill_color = "dodgerblue")
ci_level: 0.8 (80% intervals)
outer_level: 0.95 (95% intervals)

# trace
stan_trace(fit) +
  scale_color_manual(values = c("red", "blue", "green", "black"))
Scale for 'colour' is already present. Adding another scale for 'colour',
which will replace the existing scale.

Python in R Markdown

https://rstudio.github.io/reticulate/#python-in-r-markdown

reticulate & R Notebooks

Create object in R code chunk

require(reticulate)
dat <- c(180, 215, 210, 210, 188, 176, 209, 200)

Manipulate r.dat in Python code chunk

198.5

Access py$avg object in R code chunk

py$avg
[1] 198.5
